有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

JavaSpring集成聚合器

我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点

目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合到一个目录中

我的整个项目如下所示:

读入。拉链->;传递给将其解压到目录中的自定义消息处理程序->;从该目录读取文件->;试着把它们聚合起来

如果我能在解压文件后发送一条带有两个有效载荷的消息,那就太好了,但是在阅读后聚合就足够了

我的拉链看起来像这样:

public class ZipHandler extends AbstractMessageHandler {

File dat;
File json;

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
    byte[] buffer = new byte[1024];
    try {
        File file = (File) message.getPayload();
        ZipFile zip = new ZipFile(file);

        for (Enumeration<? extends ZipEntry> entries = zip.entries(); entries
                .hasMoreElements();) {
            ZipEntry ze = entries.nextElement();
            String name = ze.getName();

            if (name.endsWith(".dat") || name.endsWith(".DAT")) {
                InputStream input = zip.getInputStream(ze);
                File datFile = new File("D:/lrtrans/zipOut"
                        + File.separator + name);
                FileOutputStream fos = new FileOutputStream(datFile);
                int len;
                while ((len = input.read(buffer)) > 0) {
                    fos.write(buffer, 0, len);
                }
                this.dat = datFile;
                fos.close();
            } else if (name.endsWith(".json") || name.endsWith(".JSON")) {
                InputStream input = zip.getInputStream(ze);
                File jsonFile = new File("D:/lrtrans/zipOut"
                        + File.separator + name);
                FileOutputStream fos = new FileOutputStream(jsonFile);
                int len;
                while ((len = input.read(buffer)) > 0) {
                    fos.write(buffer, 0, len);
                }
                this.json = jsonFile;
                fos.close();
            }
        }
        zip.close();
    } catch (Exception e) {
        e.printStackTrace();
    }

}
}

它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。 我还想只使用基于注释的表示法而不是xml来解决这个问题

编辑:

我只想使用DefaultAggregatingMessageGroupProssor和基于标题名为“zip”的CorrelationsStrategy以及基于消息的releaseStrategy,因为在这种情况下,两个文件应该组合成一个

@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public DefaultAggregatingMessageGroupProcessor aggregate(){
    DefaultAggregatingMessageGroupProcessor aggregator = new DefaultAggregatingMessageGroupProcessor(); 
    return aggregator;
}
@CorrelationStrategy 
public String correlateBy(@Header("zipFile") String zip){
    return zip;
}
@ReleaseStrategy
public boolean isReadytoRelease(List<Message<?>> messages) {
    return messages.size() == 2;
}

共 (1) 个答案

  1. # 1 楼答案

    我得说你走对了路。由于zip中有多个文件,因此正确的要求是将其解压并将这些文件收集为一条消息,然后发送以供进一步处理

    所以,是的,<aggregator>是给你的。只需要确定如何关联和分组它们

    不知道如何解压,但确实可以使用zip文件名作为correlationKey,并使用大量文件作为组大小来确定释放组的信号

    请随时提出更多问题。但首先我要看看你的“拉链”

    更新

    首先,基于注释的聚合器配置有点有限,最好在AggregatingMessageHandler{}上使用@ServiceActivator,以便更好地控制其选项

    然而,即使您选择了它,您也可以实现您的需求。但是@Aggregator配置应该遵循POJO方法调用原则:

    @Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
    public List<File> aggregate(List<File> files){
        return files;
    }
    

    差不多吧